/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
	"context"
	"flag"
	"fmt"
	"os"
	"strings"
	"sync"
	"testing"
	"time"

	topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
	"github.com/k8stopologyawareschedwg/podfingerprint"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/uuid"
	"k8s.io/apimachinery/pkg/util/wait"
	clientset "k8s.io/client-go/kubernetes"
	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
	"k8s.io/klog/v2"
	"k8s.io/kubernetes/pkg/scheduler"
	"k8s.io/kubernetes/pkg/scheduler/apis/config"
	schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
	fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
	st "k8s.io/kubernetes/pkg/scheduler/testing"

	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

	schedconfig "sigs.k8s.io/scheduler-plugins/apis/config"
	"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology"
	"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/nodeconfig"
	"sigs.k8s.io/scheduler-plugins/test/util"
)

const (
	defaultCacheResyncPeriodSeconds int64 = 5
	anyNode                               = "*"
	discardReservedSchedulerName          = "discardReserved"
)

var (
	schedVerbose = "0"
)

type podDesc struct {
	schedulerName     string
	podName           string
	isGuaranteed      bool
	isDelete          bool
	resourcesMap      map[string]string
	multiResourcesMap []map[string]string
	expectedNode      string
	// autogenerated
	pod *corev1.Pod
}

func (p *podDesc) SetupPod(ns string, initContainer bool) {
	pt := st.MakePod().Namespace(ns).Name(p.podName).SchedulerName(p.schedulerName)
	if p.resourcesMap != nil {
		if p.isGuaranteed {
			pt = util.WithLimits(pt, p.resourcesMap, initContainer)
		} else {
			pt = util.WithRequests(pt, p.resourcesMap, initContainer)
		}
	}
	for _, res := range p.multiResourcesMap {
		if p.isGuaranteed {
			pt = util.WithLimits(pt, res, initContainer)
		} else {
			pt = util.WithRequests(pt, res, initContainer)
		}
	}
	p.pod = pt.Obj()
}

type testCase struct {
	name                   string
	nodeResourceTopologies []*topologyv1alpha2.NodeResourceTopology
	podDescs               []podDesc
}

func init() {
	klog.InitFlags(nil)
	if val, ok := os.LookupEnv("SCHED_PLUGINS_TEST_VERBOSE"); ok {
		schedVerbose = val
	}
}

func TestTopologyCachePluginWithoutUpdates(t *testing.T) {

	os.Args = []string{"unused", "-logtostderr", "-v", schedVerbose}
	t.Logf("args = %v", os.Args[1:])
	flag.Parse()

	// key: BE: Best Effort QoS; BU: BUrstable QoS; GU: GUaranteed QoS
	for _, tt := range []testCase{
		// BE pods: not impacted at all (no resource tracking)
		{
			name: "BU pod: pessimistic cache overallocation not impactful, pod to be scheduled",
			podDescs: []podDesc{
				{
					podName:      "nrt-bu-overalloc-1000",
					isGuaranteed: false,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: anyNode,
				},
				{
					podName:      "nrt-bu-overalloc-2000",
					isGuaranteed: false,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: anyNode,
				},
			},
			nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
				MakeNRT().Name("fake-node-cache-1").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "60Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
						}).Obj(),
				MakeNRT().Name("fake-node-cache-2").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "10"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "14Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "8"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "10Gi"),
						}).Obj(),
			},
		},
		{
			name: "BE pod: with devices, pessimistic cache overallocation prevents pod to be scheduled",
			podDescs: []podDesc{
				{
					podName:      "nrt-be-overalloc-3000",
					isGuaranteed: true, // aka set resourcesMap in limits
					resourcesMap: map[string]string{
						nicResourceName: "2",
					},
					expectedNode: "fake-node-cache-2",
				},
				{
					podName:      "nrt-be-overalloc-4000",
					isGuaranteed: true, // aka set resourcesMap in limits
					resourcesMap: map[string]string{
						nicResourceName: "2",
					},
					expectedNode: "",
				},
			},
			nodeResourceTopologies: makeTestFullyAvailableNRTs(),
		},
		{
			name: "GU pod: pessimistic cache overallocation prevents pod to be scheduled",
			podDescs: []podDesc{
				{
					podName:      "nrt-gu-overalloc-1000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: "fake-node-cache-1",
				},
				{
					podName:      "nrt-gu-overalloc-2000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: "",
				},
			},
			nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
				MakeNRT().Name("fake-node-cache-1").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "60Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
						}).Obj(),
				MakeNRT().Name("fake-node-cache-2").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "10"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "14Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "8"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "10Gi"),
						}).Obj(),
			},
		},
		{
			name: "GU pod: pessimistic cache overallocation ignores deletes prevents pod to be scheduled",
			podDescs: []podDesc{
				{
					podName:      "nrt-gu-overalloc-del-3000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: "fake-node-cache-1",
				},
				{
					podName:  "nrt-gu-overalloc-del-3000",
					isDelete: true,
				},
				{
					podName:      "nrt-gu-overalloc-del-4000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					expectedNode: "",
				},
			},
			nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
				MakeNRT().Name("fake-node-cache-1").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "60Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
						}).Obj(),
				MakeNRT().Name("fake-node-cache-2").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "10"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "14Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "8"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "10Gi"),
						}).Obj(),
			},
		},
		{
			name: "GU pod: DiscardReservedNodes: allows scheduling on both Zones",
			podDescs: []podDesc{
				{
					podName:      "nrt-gu-discardresv-1000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					schedulerName: discardReservedSchedulerName,
					expectedNode:  "fake-node-cache-1",
				},
				{
					podName:      "nrt-gu-discardresv-2000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					schedulerName: discardReservedSchedulerName,
					expectedNode:  "fake-node-cache-1",
				},
			},
			nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
				MakeNRT().Name("fake-node-cache-1").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "60Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
						}).Obj(),
				MakeNRT().Name("fake-node-cache-2").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "10"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "14Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "8"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "10Gi"),
						}).Obj(),
			},
		},
		{
			name: "GU pod: DiscardReservedNodes: new pod is successfully scheduled on the node, after deleting pod consuming most resources",
			podDescs: []podDesc{
				{
					podName:      "nrt-gu-discardresv-ok-3000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "30",
						string(corev1.ResourceMemory): "60Gi",
					},
					schedulerName: discardReservedSchedulerName,
					expectedNode:  "fake-node-cache-1",
				},
				{
					podName:       "nrt-gu-discardresv-ok-3000",
					isDelete:      true,
					schedulerName: "discardReserved",
				},
				{
					podName:      "nrt-gu-discardresv-ok-4000",
					isGuaranteed: true,
					resourcesMap: map[string]string{
						string(corev1.ResourceCPU):    "16",
						string(corev1.ResourceMemory): "24Gi",
					},
					schedulerName: discardReservedSchedulerName,
					expectedNode:  "fake-node-cache-1",
				},
			},
			nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
				MakeNRT().Name("fake-node-cache-1").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "60Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
						}).Obj(),
				MakeNRT().Name("fake-node-cache-2").
					Attributes(topologyv1alpha2.AttributeList{
						{
							Name:  nodeconfig.AttributePolicy,
							Value: "single-numa-node",
						},
						{
							Name:  nodeconfig.AttributeScope,
							Value: "container",
						},
					}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "10"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "14Gi"),
						}).
					Zone(
						topologyv1alpha2.ResourceInfoList{
							noderesourcetopology.MakeTopologyResInfo(cpu, "32", "8"),
							noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "10Gi"),
						}).Obj(),
			},
		},
	} {
		t.Run(tt.name, func(t *testing.T) {
			// because caching, each testcase needs to run from a clean slate
			testCtx := &testContext{}
			testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())

			cs := clientset.NewForConfigOrDie(globalKubeConfig)
			scheme := runtime.NewScheme()
			utilruntime.Must(clientgoscheme.AddToScheme(scheme))
			utilruntime.Must(topologyv1alpha2.AddToScheme(scheme))
			extClient, err := ctrlclient.New(globalKubeConfig, ctrlclient.Options{Scheme: scheme})
			if err != nil {
				t.Fatalf("Failed to create client: %v", err)
			}
			testCtx.ClientSet = cs
			testCtx.KubeConfig = globalKubeConfig

			if err := waitForNRT(t, cs); err != nil {
				t.Fatalf("Timed out waiting for CRD to be ready: %v", err)
			}

			ns := fmt.Sprintf("integration-test-%v", string(uuid.NewUUID()))
			createNamespace(t, testCtx, ns)

			cfg, err := util.NewDefaultSchedulerComponentConfig()
			if err != nil {
				t.Fatal(err)
			}
			cfg.Profiles[0].Plugins.Filter.Enabled = append(cfg.Profiles[0].Plugins.Filter.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
			cfg.Profiles[0].Plugins.Reserve.Enabled = append(cfg.Profiles[0].Plugins.Reserve.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
			cfg.Profiles[0].Plugins.Score.Enabled = append(cfg.Profiles[0].Plugins.Score.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
			cfg.Profiles[0].PluginConfig = append(cfg.Profiles[0].PluginConfig, schedapi.PluginConfig{
				Name: noderesourcetopology.Name,
				Args: &schedconfig.NodeResourceTopologyMatchArgs{
					ScoringStrategy:          schedconfig.ScoringStrategy{Type: schedconfig.LeastAllocated},
					CacheResyncPeriodSeconds: defaultCacheResyncPeriodSeconds,
				},
			})

			cfg.Profiles = append(cfg.Profiles, discardReservedSchedulerProfile())

			defer func() {
				cleanupTest(t, testCtx)
				t.Logf("test environment %q cleaned up", tt.name)
			}()

			if err := createNodesFromNodeResourceTopologies(t, cs, testCtx.Ctx, tt.nodeResourceTopologies); err != nil {
				t.Fatalf("%v", err)
			}

			var pods []*corev1.Pod
			for idx := range tt.podDescs {
				p := &tt.podDescs[idx]
				if p.isDelete {
					continue
				}

				p.SetupPod(ns, false)
				pods = append(pods, p.pod)
				t.Logf("Prepared pod: %s (phase=%s)", p.pod.Name, p.pod.Status.Phase)
			}

			t.Logf("Start-topology-match-cache-pfp-test %q", tt.name)
			defer cleanupNodeResourceTopologies(t, testCtx.Ctx, extClient, tt.nodeResourceTopologies)
			defer func() {
				cleanupPods(t, testCtx, pods)
				t.Log("Pods cleaned up")
			}()

			t.Logf("Creating %d NRT objects", len(tt.nodeResourceTopologies))
			if err := createNodeResourceTopologies(testCtx.Ctx, extClient, tt.nodeResourceTopologies); err != nil {
				t.Fatal(err)
			}

			testCtx = initTestSchedulerWithOptions(
				t,
				testCtx,
				scheduler.WithProfiles(cfg.Profiles...),
				// default value is 30 seconds, lower it to 10 to speed up tests
				scheduler.WithPodMaxInUnschedulablePodsDuration(10*time.Second),
				scheduler.WithFrameworkOutOfTreeRegistry(fwkruntime.Registry{noderesourcetopology.Name: noderesourcetopology.New}),
			)
			syncInformerFactory(testCtx)
			go testCtx.Scheduler.Run(testCtx.Ctx)
			t.Log("init scheduler success")

			for idx := range tt.podDescs {
				p := &tt.podDescs[idx]
				if p.isDelete {
					var err error
					t.Logf("Waiting before to delete Pod %q", p.podName)
					updatedPod, err := podIsScheduled(t, 1*time.Second, 20, cs, ns, p.podName)
					if err != nil {
						// we need more context, but we don't want to clutter the logs
						t.Logf("%s: pod %s/%s to be scheduled, error: %v\nstatus=%s", tt.name, p.pod.Namespace, p.pod.Name, err, formatObject(updatedPod.Status))
						t.Errorf("Pod %q to be scheduled, error: %v", p.pod.Name, err)
					}

					t.Logf("Deleting Pod %q", p.podName)
					err = cs.CoreV1().Pods(ns).Delete(testCtx.Ctx, p.podName, metav1.DeleteOptions{})
					if err != nil {
						t.Fatalf("Failed to delete Pod %q: %v", p.podName, err)
					}
				} else {
					t.Logf("Creating Pod %q: scheduler: %q", p.pod.Name, p.pod.Spec.SchedulerName)
					_, err := cs.CoreV1().Pods(ns).Create(testCtx.Ctx, p.pod, metav1.CreateOptions{})
					if err != nil {
						t.Fatalf("Failed to create Pod %q: %v", p.pod.Name, err)
					}
				}
			}

			for _, p := range tt.podDescs {
				if p.isDelete {
					continue
				}

				var action string = "scheduled"
				var checkPod func(t *testing.T, interval time.Duration, times int, cs clientset.Interface, podNamespace, podName string) (*corev1.Pod, error) = podIsScheduled
				if p.expectedNode == "" {
					action = "kept pending"
					checkPod = podIsPending
				}

				// set timeout to 50s, flushUnschedulableQLeftover is running every 30 seconds + 10 seconds for podMaxInUnschedulablePodsDuration + 10 seconds just to be sure
				// we need to make sure scheduler will move failed pod from Unschedulable queue to Active queue at least once
				updatedPod, err := checkPod(t, 1*time.Second, 50, cs, p.pod.Namespace, p.pod.Name)
				if err != nil {
					// we need more context, but we don't want to clutter the logs
					t.Logf("%s: pod %s/%s to be %s, error: %v\nstatus=%s", tt.name, p.pod.Namespace, p.pod.Name, action, err, formatObject(updatedPod.Status))
					t.Errorf("Pod %s/%s to be %s, error: %v", p.pod.Namespace, p.pod.Name, action, err)
				}
				t.Logf("Pod %v %s", p.pod.Name, action)
			}

			for _, p := range tt.podDescs {
				if p.isDelete {
					continue
				}

				nodeName, err := getNodeName(testCtx.Ctx, cs, ns, p.pod.Name)
				t.Logf("Pod %s scheduled on node %q (expected %q)", p.pod.Name, nodeName, p.expectedNode)
				if err != nil {
					t.Logf("%v", err)
				}

				if !podMatchesExpectedNode(t, p.pod.Namespace, p.pod.Name, nodeName, p.expectedNode) {
					t.Errorf("misplaced pod: %q got %q expected %q", p.pod.Name, nodeName, p.expectedNode)
				}
			}

			t.Logf("Case %v finished", tt.name)
		})
	}
}

func TestTopologyCachePluginWithPodFingerprintUpdates(t *testing.T) {

	os.Args = []string{"unused", "-logtostderr", "-v", schedVerbose}
	t.Logf("args = %v", os.Args[1:])
	flag.Parse()

	tt := testCase{
		name: "GU pod: pessimistic cache overallocation prevents pod to be scheduled until resync happens",
		podDescs: []podDesc{
			// each pod asks > 50% CPUs on NUMA zones; thus, the NRT overallocation will pessimistically
			// overreserve > 50% CPUs on the node. Note the NodeResourceFit plugin must still pass
			{
				podName:      "nrt-wkp-pod-1000",
				isGuaranteed: true,
				resourcesMap: map[string]string{
					string(corev1.ResourceCPU):    "24",
					string(corev1.ResourceMemory): "12Gi",
				},
				expectedNode: "fake-node-cache-1",
			},
			{
				podName:      "nrt-wkp-pod-2000",
				isGuaranteed: true,
				resourcesMap: map[string]string{
					string(corev1.ResourceCPU):    "24",
					string(corev1.ResourceMemory): "12Gi",
				},
				expectedNode: "fake-node-cache-1",
			},
		},
		nodeResourceTopologies: makeTestFullyAvailableNRTSingle(),
	}

	// because caching, each testcase needs to run from a clean slate
	extTestCtx := makeNRTSchedTestContext(t, nil)
	testCtx := extTestCtx.tctx // shortcut

	defer func() {
		cleanupTest(t, testCtx)
		t.Log("test environment cleaned up")
	}()

	if err := waitForNRT(t, extTestCtx.cli); err != nil {
		t.Fatalf("Timed out waiting for CRD to be ready: %v", err)
	}

	ns := fmt.Sprintf("integration-test-%v", string(uuid.NewUUID()))
	createNamespace(t, testCtx, ns)

	if err := createNodesFromNodeResourceTopologies(t, extTestCtx.cli, testCtx.Ctx, tt.nodeResourceTopologies); err != nil {
		t.Fatalf("%v", err)
	}

	var pods []*corev1.Pod
	for idx := range tt.podDescs {
		p := &tt.podDescs[idx]
		if p.isDelete {
			continue
		}

		p.SetupPod(ns, false)
		pods = append(pods, p.pod)
		t.Logf("Prepared pod: %s (phase=%s)", p.pod.Name, p.pod.Status.Phase)
	}

	t.Logf("Start-topology-match-cache-pfp-test %q", tt.name)
	defer cleanupNodeResourceTopologies(t, testCtx.Ctx, extTestCtx.extCli, tt.nodeResourceTopologies)
	defer func() {
		cleanupPods(t, testCtx, pods)
		t.Log("Pods cleaned up")
	}()

	t.Logf("Creating %d NRT objects", len(tt.nodeResourceTopologies))
	if err := createNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, tt.nodeResourceTopologies); err != nil {
		t.Fatal(err)
	}

	testCtx = initTestSchedulerWithOptions(
		t,
		testCtx,
		scheduler.WithProfiles(extTestCtx.cfg.Profiles...),
		scheduler.WithFrameworkOutOfTreeRegistry(fwkruntime.Registry{noderesourcetopology.Name: noderesourcetopology.New}),
	)
	syncInformerFactory(testCtx)
	go testCtx.Scheduler.Run(testCtx.Ctx)
	t.Log("init scheduler success")

	for idx := range tt.podDescs {
		p := &tt.podDescs[idx]
		t.Logf("Creating Pod %q", p.pod.Name)
		_, err := extTestCtx.cli.CoreV1().Pods(ns).Create(testCtx.Ctx, p.pod, metav1.CreateOptions{})
		if err != nil {
			t.Fatalf("Failed to create Pod %q: %v", p.pod.Name, err)
		}
	}

	scheduledPods, _, _ := waitForPodList(t, extTestCtx.cli, tt.podDescs, 1*time.Minute)
	expectedScheduled := 1
	if len(scheduledPods) != expectedScheduled {
		t.Fatalf("pods scheduled %d expected %d", len(scheduledPods), expectedScheduled)
	}

	// we want to run concurrently with the resync loop is running.
	go func() {
		// the first pod lands on the expected node
		runningPod := tt.podDescs[0].pod.DeepCopy()
		runningPod.Status.Phase = corev1.PodRunning
		_, err := extTestCtx.cli.CoreV1().Pods(runningPod.Namespace).UpdateStatus(testCtx.Ctx, runningPod, metav1.UpdateOptions{})
		if err != nil {
			// we can call t.Fatalf only on the main goroutine, so we just log
			t.Logf("cannot update status of %s/%s: %s", runningPod.Namespace, runningPod.Name, err)
		}

		pfpSign := mkPFP(t, "fake-node-cache-1", tt.podDescs[0].pod)
		updatedNRTs := []*topologyv1alpha2.NodeResourceTopology{
			MakeNRT().Name("fake-node-cache-1").
				Attributes(topologyv1alpha2.AttributeList{
					{
						Name:  nodeconfig.AttributePolicy,
						Value: "single-numa-node",
					},
					{
						Name:  nodeconfig.AttributeScope,
						Value: "container",
					},
					{
						Name:  podfingerprint.Attribute,
						Value: pfpSign,
					},
				}).
				Annotations(map[string]string{
					podfingerprint.Annotation: pfpSign,
				}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "6"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "48Gi"),
					}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).Obj(),
		}

		// wait some time to have reasonnable chance to hit while the resync loop is running
		time.Sleep(extTestCtx.CacheResyncPeriodSeconds(3))

		// first update: this is supposed to trigger the cache update because PFP are expected to match
		t.Logf("updating %d NRTs", len(updatedNRTs))
		err = updateNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, updatedNRTs)
		if err != nil {
			// we can call t.Fatalf only on the main goroutine, so we just log
			t.Logf("cannot update NRTs: %s", err)
		}
		t.Logf("updated %d NRTs", len(updatedNRTs))

		// When will the resync loop trigger? we can't predict. So we wait "long enough" before to send the trigger event
		time.Sleep(extTestCtx.CacheResyncPeriodSeconds(5))

		updatedNRTs = []*topologyv1alpha2.NodeResourceTopology{
			MakeNRT().Name("fake-node-cache-1").
				Attributes(topologyv1alpha2.AttributeList{
					{
						Name:  nodeconfig.AttributePolicy,
						Value: "single-numa-node",
					},
					{
						Name:  nodeconfig.AttributeScope,
						Value: "container",
					},
				}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "6"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "48Gi"),
					}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).Obj(),
		}

		// second update. This will trigger the reschedule attempt, cache content won't change.
		t.Logf("updating %d NRTs", len(updatedNRTs))
		err = updateNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, updatedNRTs)
		if err != nil {
			// we can call t.Fatalf only on the main goroutine, so we just log
			t.Logf("cannot update NRTs: %s", err)
		}
		t.Logf("updated %d NRTs", len(updatedNRTs))

	}()

	// we need a very generous timeout here to make sure the resync code in the scheduler plugin catches up
	scheduledPods, pendingPods, failedPods := waitForPodList(t, extTestCtx.cli, tt.podDescs, 5*time.Minute)

	if len(failedPods) > 0 {
		var sb strings.Builder
		for name, err := range failedPods {
			fmt.Fprintf(&sb, "[%s: %v] ", name, err)
		}
		t.Fatalf("failed pods: %s", sb.String())
	}

	expectedScheduled = 2
	if len(scheduledPods) != expectedScheduled {
		t.Fatalf("pods running %d expected %d", len(scheduledPods), expectedScheduled)
	}
	if len(pendingPods) > 0 {
		t.Fatalf("expected non-running pods 0 got %d", len(pendingPods))
	}

	t.Logf("Case %v finished", tt.name)
}

func TestTopologyCachePluginWithAttributeUpdates(t *testing.T) {

	os.Args = []string{"unused", "-logtostderr", "-v", schedVerbose}
	t.Logf("args = %v", os.Args[1:])
	flag.Parse()

	tt := testCase{
		name: "GU pod: unschedulable until the topology manager configuration changes and triggers a resync",
		podDescs: []podDesc{
			// each pod asks > 50% CPUs on NUMA zones; thus, the NRT overallocation will pessimistically
			// overreserve > 50% CPUs on the node. Note the NodeResourceFit plugin must still pass
			{
				podName:      "nrt-attr-pod-1000",
				isGuaranteed: true,
				multiResourcesMap: []map[string]string{
					{
						string(corev1.ResourceCPU):    "24",
						string(corev1.ResourceMemory): "48Gi",
					},
					{
						string(corev1.ResourceCPU):    "24",
						string(corev1.ResourceMemory): "48Gi",
					},
				},
				expectedNode: "fake-node-cache-attr-1",
			},
		},
		nodeResourceTopologies: []*topologyv1alpha2.NodeResourceTopology{
			MakeNRT().Name("fake-node-cache-attr-1").
				Attributes(topologyv1alpha2.AttributeList{
					{
						Name:  nodeconfig.AttributePolicy,
						Value: "single-numa-node",
					},
					{
						Name:  nodeconfig.AttributeScope,
						Value: "pod",
					},
				}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).Obj(),
		},
	}

	// because caching, each testcase needs to run from a clean slate
	cacheResyncScope := schedconfig.CacheResyncScopeAll
	cacheArgs := schedconfig.NodeResourceTopologyCache{
		ResyncScope: &cacheResyncScope,
	}
	extTestCtx := makeNRTSchedTestContext(t, &cacheArgs)
	testCtx := extTestCtx.tctx // shortcut

	defer func() {
		cleanupTest(t, testCtx)
		t.Log("test environment cleaned up")
	}()

	if err := waitForNRT(t, extTestCtx.cli); err != nil {
		t.Fatalf("Timed out waiting for CRD to be ready: %v", err)
	}

	ns := fmt.Sprintf("integration-test-%v", string(uuid.NewUUID()))
	createNamespace(t, testCtx, ns)

	if err := createNodesFromNodeResourceTopologies(t, testCtx.ClientSet, testCtx.Ctx, tt.nodeResourceTopologies); err != nil {
		t.Fatalf("%v", err)
	}

	var pods []*corev1.Pod
	for idx := range tt.podDescs {
		p := &tt.podDescs[idx]
		if p.isDelete {
			continue
		}

		p.SetupPod(ns, false)
		pods = append(pods, p.pod)
		t.Logf("Prepared pod: %s (phase=%s)", p.pod.Name, p.pod.Status.Phase)
	}

	t.Logf("Start-topology-match-cache-attr-test %q", tt.name)
	defer cleanupNodeResourceTopologies(t, testCtx.Ctx, extTestCtx.extCli, tt.nodeResourceTopologies)
	defer func() {
		cleanupPods(t, testCtx, pods)
		t.Log("Pods cleaned up")
	}()

	t.Logf("Creating %d NRT objects", len(tt.nodeResourceTopologies))
	if err := createNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, tt.nodeResourceTopologies); err != nil {
		t.Fatal(err)
	}

	testCtx = initTestSchedulerWithOptions(
		t,
		testCtx,
		scheduler.WithProfiles(extTestCtx.cfg.Profiles...),
		scheduler.WithFrameworkOutOfTreeRegistry(fwkruntime.Registry{noderesourcetopology.Name: noderesourcetopology.New}),
	)
	syncInformerFactory(testCtx)
	go testCtx.Scheduler.Run(testCtx.Ctx)
	t.Log("init scheduler success")

	for idx := range tt.podDescs {
		p := &tt.podDescs[idx]
		t.Logf("Creating Pod %q", p.pod.Name)
		_, err := extTestCtx.cli.CoreV1().Pods(ns).Create(testCtx.Ctx, p.pod, metav1.CreateOptions{})
		if err != nil {
			t.Fatalf("Failed to create Pod %q: %v", p.pod.Name, err)
		}
	}

	scheduledPods, _, _ := waitForPodList(t, extTestCtx.cli, tt.podDescs, 1*time.Minute)
	expectedScheduled := 0
	if len(scheduledPods) != expectedScheduled {
		t.Fatalf("pods scheduled %d expected %d", len(scheduledPods), expectedScheduled)
	}

	// we want to run concurrently with the resync loop is running.
	go func() {

		// wait some time to have reasonnable chance to hit while the resync loop is running
		time.Sleep(extTestCtx.CacheResyncPeriodSeconds(3))

		// first update: this is supposed to trigger the cache update because PFP are expected to match
		t.Logf("updating %d NRTs", 1)
		err := updateNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, []*topologyv1alpha2.NodeResourceTopology{
			MakeNRT().Name("fake-node-cache-attr-1").
				Attributes(topologyv1alpha2.AttributeList{
					{
						Name:  nodeconfig.AttributePolicy,
						Value: "single-numa-node",
					},
					{
						Name:  nodeconfig.AttributeScope,
						Value: "container",
					},
				}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).Obj(),
		})

		if err != nil {
			// we can call t.Fatalf only on the main goroutine, so we just log
			t.Logf("cannot update NRTs: %v", err)
		}
		t.Logf("updated %d NRTs", 1)

		// When will the resync loop trigger? we can't predict. So we wait "long enough" before to send the trigger event
		time.Sleep(extTestCtx.CacheResyncPeriodSeconds(5))

		// second update. This will trigger the reschedule attempt, cache content won't change.
		t.Logf("updating %d NRTs", 1)
		err = updateNodeResourceTopologies(testCtx.Ctx, extTestCtx.extCli, []*topologyv1alpha2.NodeResourceTopology{
			MakeNRT().Name("fake-node-cache-attr-1").
				Attributes(topologyv1alpha2.AttributeList{
					{
						Name:  nodeconfig.AttributePolicy,
						Value: "single-numa-node",
					},
					{
						Name:  nodeconfig.AttributeScope,
						Value: "container",
					},
					// trigger another update. The first update will race with the resync, and likely
					// the framework will be quicker, so the related retry will fail - the NRT cache
					// won't be updated just yet, resync() will. So we need a second update,
					// and we need to make sure something changed in the object to trigger a reschedule.
					{
						Name:  "foo",
						Value: "bar",
					},
				}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).
				Zone(
					topologyv1alpha2.ResourceInfoList{
						noderesourcetopology.MakeTopologyResInfo(cpu, "32", "30"),
						noderesourcetopology.MakeTopologyResInfo(memory, "64Gi", "62Gi"),
					}).Obj(),
		})

		if err != nil {
			// we can call t.Fatalf only on the main goroutine, so we just log
			t.Logf("cannot update NRTs: %v", err)
		}
		t.Logf("updated %d NRTs", 1)

	}()

	// we need a very generous timeout here to make sure the resync code in the scheduler plugin catches up
	scheduledPods, pendingPods, failedPods := waitForPodList(t, extTestCtx.cli, tt.podDescs, 5*time.Minute)

	if len(failedPods) > 0 {
		var sb strings.Builder
		for name, err := range failedPods {
			fmt.Fprintf(&sb, "[%s: %v] ", name, err)
		}
		t.Fatalf("failed pods: %s", sb.String())
	}

	expectedScheduled = 1
	if len(scheduledPods) != expectedScheduled {
		t.Fatalf("pods running %d expected %d", len(scheduledPods), expectedScheduled)
	}
	if len(pendingPods) > 0 {
		t.Fatalf("expected non-running pods 0 got %d", len(pendingPods))
	}

	t.Logf("Case %v finished", tt.name)
}

type extTestContext struct {
	tctx      *testContext
	cli       *clientset.Clientset
	extCli    ctrlclient.Client
	cfg       config.KubeSchedulerConfiguration
	matchArgs schedconfig.NodeResourceTopologyMatchArgs
}

func (etc extTestContext) CacheResyncPeriodSeconds(mult int) time.Duration {
	return time.Duration(int64(mult)*etc.matchArgs.CacheResyncPeriodSeconds) * time.Second
}

func makeNRTSchedTestContext(t *testing.T, cacheArgs *schedconfig.NodeResourceTopologyCache) extTestContext {
	t.Helper()

	testCtx := &testContext{}
	testCtx.Ctx, testCtx.CancelFn = context.WithCancel(context.Background())

	scheme := runtime.NewScheme()
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
	utilruntime.Must(topologyv1alpha2.AddToScheme(scheme))
	cs := clientset.NewForConfigOrDie(globalKubeConfig)
	extClient, err := ctrlclient.New(globalKubeConfig, ctrlclient.Options{Scheme: scheme})
	if err != nil {
		t.Fatalf("Failed to create client: %v", err)
	}
	testCtx.ClientSet = cs
	testCtx.KubeConfig = globalKubeConfig

	cfg, err := util.NewDefaultSchedulerComponentConfig()
	if err != nil {
		t.Fatal(err)
	}

	matchArgs := schedconfig.NodeResourceTopologyMatchArgs{
		ScoringStrategy:          schedconfig.ScoringStrategy{Type: schedconfig.LeastAllocated},
		CacheResyncPeriodSeconds: defaultCacheResyncPeriodSeconds,
		Cache:                    cacheArgs,
	}

	cfg.Profiles[0].Plugins.Filter.Enabled = append(cfg.Profiles[0].Plugins.Filter.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
	cfg.Profiles[0].Plugins.Reserve.Enabled = append(cfg.Profiles[0].Plugins.Reserve.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
	cfg.Profiles[0].Plugins.Score.Enabled = append(cfg.Profiles[0].Plugins.Score.Enabled, schedapi.Plugin{Name: noderesourcetopology.Name})
	cfg.Profiles[0].PluginConfig = append(cfg.Profiles[0].PluginConfig, schedapi.PluginConfig{
		Name: noderesourcetopology.Name,
		Args: &matchArgs,
	})

	return extTestContext{
		tctx:      testCtx,
		cli:       cs,
		extCli:    extClient,
		cfg:       cfg,
		matchArgs: matchArgs,
	}
}

func waitForPodList(t *testing.T, cs clientset.Interface, podDescs []podDesc, timeout time.Duration) ([]*corev1.Pod, []*corev1.Pod, map[string]error) {
	var lock sync.Mutex
	var scheduled []*corev1.Pod
	var pending []*corev1.Pod
	failed := make(map[string]error)

	var wg sync.WaitGroup
	for _, podDesc := range podDescs {
		wg.Add(1)
		go func(pod *corev1.Pod, expectedNode string) {
			defer wg.Done()

			var updatedPod *corev1.Pod
			err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
				var nerr error
				updatedPod, nerr = cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
				if nerr != nil {
					t.Logf("Failed to get pod %s/%s: %v", pod.Namespace, pod.Name, nerr)
					return false, nerr
				}

				return podMatchesExpectedNode(t, updatedPod.Namespace, updatedPod.Name, updatedPod.Spec.NodeName, expectedNode), nil
			})

			// TODO: channels would be nicer
			lock.Lock()
			if err != nil {
				failed[updatedPod.Name] = err
			} else if expectedNode == "" {
				pending = append(pending, updatedPod)
			} else {
				scheduled = append(scheduled, updatedPod)
			}
			lock.Unlock()
		}(podDesc.pod, podDesc.expectedNode)
	}
	wg.Wait()
	return scheduled, pending, failed
}

func podMatchesExpectedNode(t *testing.T, podNamespace, podName, nodeName, expectedNode string) bool {
	if expectedNode == nodeName {
		t.Logf("Pod %s/%s is on node %q as expected.", podNamespace, podName, nodeName)
		return true
	} else if expectedNode == anyNode {
		if nodeName != "" {
			t.Logf("Pod %s/%s is running, any node is fine (currently on %q)", podNamespace, podName, nodeName)
			return true
		}
		t.Logf("Pod %s/%s is expected to be bound to any node, but still pending", podNamespace, podName)
	} else if expectedNode == "" {
		t.Logf("Pod %s/%s is expected to be pending, but found on node %q", podNamespace, podName, nodeName)
	} else {
		t.Logf("Pod %s/%s is expected on node %q, but found on node %q", podNamespace, podName, expectedNode, nodeName)
	}
	return false
}

func mkPFP(t *testing.T, nodeName string, pods ...*corev1.Pod) string {
	st := podfingerprint.MakeStatus(nodeName)
	fp := podfingerprint.NewTracingFingerprint(len(pods), &st)
	for _, pod := range pods {
		fp.AddPod(pod)
	}
	pfp := fp.Sign()
	t.Logf("PFP for %q: %s", nodeName, st.Repr())
	return pfp
}

func discardReservedSchedulerProfile() schedapi.KubeSchedulerProfile {
	nodeLockingMatchArgs := schedconfig.NodeResourceTopologyMatchArgs{
		ScoringStrategy:      schedconfig.ScoringStrategy{Type: schedconfig.LeastAllocated},
		DiscardReservedNodes: true,
	}

	return schedapi.KubeSchedulerProfile{
		SchedulerName: discardReservedSchedulerName,
		Plugins: &schedapi.Plugins{
			QueueSort: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: queuesort.Name},
				},
			},
			Filter: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: noderesourcetopology.Name},
				},
			},
			Score: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: noderesourcetopology.Name},
				},
			},
			Reserve: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: noderesourcetopology.Name},
				},
			},
			PostBind: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: noderesourcetopology.Name},
				},
			},
			Bind: schedapi.PluginSet{
				Enabled: []schedapi.Plugin{
					{Name: defaultbinder.Name},
				},
			},
		},
		PluginConfig: []schedapi.PluginConfig{
			{
				Name: noderesourcetopology.Name,
				Args: &nodeLockingMatchArgs,
			},
		},
	}
}
